导航菜单
首页 >  netty三万字详解JAVA高性能通信框架关于netty  > 【netty】三万字详解!JAVA高性能通信框架,关于netty,看这一篇就够了

【netty】三万字详解!JAVA高性能通信框架,关于netty,看这一篇就够了

bytebuf自动扩容,而bytebuffer不行。

8.4.操作8.4.1.读写

写操作:

import io.netty.buffer.ByteBuf;import io.netty.buffer.ByteBufAllocator;import java.nio.charset.StandardCharsets;public class test {public static void main(String[] args) {//写入数字ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeInt(666);// 写入字符串String stringValue = "Hello, World!";byte[] stringBytes = stringValue.getBytes(StandardCharsets.UTF_8);buffer.writeBytes(stringBytes);}}

读操作:

// 读取整数int readIntValue = buffer.readInt();// 读取字符串int readableBytes = buffer.readableBytes();byte[] stringBytes = new byte[readableBytes];buffer.readBytes(stringBytes);String readStringValue = new String(stringBytes, StandardCharsets.UTF_8);

需要注意的是,在读取数据之前,你需要确保 ByteBuf 中有足够的可读字节数。可以使用 readableBytes() 方法来检查 ByteBuf 中的可读字节数。

此外,ByteBuf 还提供了其他的读写操作,比如 readableBytes() 用于获取可读字节数,writerIndex() 和 readerIndex() 用于获取写入和读取的索引位置等。在使用 ByteBuf 时,请确保在读写时不越界,并且注意释放 ByteBuf 以避免内存泄漏。在Netty中,通常会使用 ReferenceCountUtil.release(buffer) 来释放 ByteBuf,确保资源得到正确释放。

读写指针:

8.4.2.释放

bytebuf要特别注意资源的释放,以避免内存泄漏。Netty使用引用计数(Reference Counting)来管理 ByteBuf 的生命周期,确保在不再需要使用时及时释放资源。

在Netty中,release() 和 retain() 是用于管理 ByteBuf 引用计数的方法。

release() 方法用于将 ByteBuf 的引用计数减少1。当引用计数减至0时,Netty会释放 ByteBuf 的内存(如果使用了池化的 ByteBuf,则将它归还给池)。

ByteBuf buffer = //... 从某个地方获取ByteBuf实例buffer.release(); // 引用计数减少1,如果引用计数为0,释放ByteBuf的内存

retain() 方法用于将 ByteBuf 的引用计数增加1。当你调用 retain() 方法时,你告诉Netty你对这个 ByteBuf 感兴趣,即使在你使用完后,其他代码也可能继续使用它。

ByteBuf buffer = //... 从某个地方获取ByteBuf实例buffer.retain(); // 引用计数增加1,防止在使用完后被提前释放8.5.零拷贝

零拷贝其实没有严格的定义,指的是减少IO过程中数据在内存中拷贝的次数这样一个大致目标。在netty的ByteBuf中也存在一些零拷贝机制,用来在多个ByteBuf之间进行数据传递。

8.5.1.slice

在 Netty 中,ByteBuf 的 slice() 方法用于创建一个与原始 ByteBuf 共享数据的新 ByteBuf。换句话说,slice() 方法返回一个从原始 ByteBuf 中截取出来的视图,这个视图与原始 ByteBuf 共享底层数据,但拥有自己的独立读写指针。由于是直接通过读写指针指向同一块内存的,所以slice出来的bytebuf并没有发送数据拷贝,是0拷贝。

如何理解拥有自己的独立读写指针喃?因为slice出来的buf和元buf共享内存,为了避免slice出来的buf通过写指针来进行写,进而影响元buf,netty在设计时故意就禁止了slice动用写指针来向元buf中进行写。只能通过读指针来读。

import io.netty.buffer.ByteBuf;import io.netty.buffer.ByteBufAllocator;public class test {public static void main(String[] args) {//开一个容量为10字节的ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);//写入数据buffer.writeBytes(new byte[]{1,2});//sliceByteBuf slice = buffer.slice(0,2);//因为slice不与元buf共享读写指针,所以write会报错,因为write是用的读写指针来进行读写,但是set不会报错,因为set不是用的读写指针来进行读写的。//slice.writeByte(1);slice.setByte(0,2);while(slice.isReadable()){System.out.println(slice.readByte());}}}8.5.2.composite

slice是将一个大的bytebuf划分成多个小的bytebuff,composite是将多个小的bytebuf聚合成一个大的bytebuf。

在 Netty 中,CompositeByteBuf 是 ByteBuf 的一个特殊实现,它提供了一种能够组合多个 ByteBuf 实例的方式。CompositeByteBuf 允许将多个 ByteBuf 视为一个单一的逻辑缓冲区,而不需要将它们合并成一个实际的连续内存块。这种设计可以提高内存的利用率和降低内存拷贝的次数。

public static void main(String[] args) {ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();buffer1.writeBytes(new byte[]{1,2,3,4,5});ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();buffer2.writeBytes(new byte[]{6,7,8,9,10});CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();//可变参数,可以有多个compositeBuffer.addComponents(buffer1,buffer2);while (compositeBuffer.isReadable()){System.out.println(compositeBuffer.readByte());}}8.6.工具类

Unpooled 是 Netty 提供的一个工具类,用于创建不需要池化的 ByteBuf 实例。在 Netty 中,ByteBuf 是用来操作字节数据的缓冲区类。通常,Unpooled 类提供了一些静态方法,用于创建不同类型的 ByteBuf 实例,包括堆缓冲区(heap buffer)、直接缓冲区(direct buffer)、组合缓冲区(composite buffer)等。

也就是说可以用unpooled来开辟各类型的bytebuf。

9.双向通信

服务端:

import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;public class Server {public static void main(String[] args) {//ServerBootstrap,启动器,负责组装netty组件new ServerBootstrap()//1.怎样去接收IO?//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件.group(new NioEventLoopGroup())//2.接收成什么?//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel.channel(NioServerSocketChannel.class)//3.做什么处理?//支持用责任链模式来对收到的IO进行链式处理.childHandler(new ChannelInitializer() {//连接建立后才会调用初始化方法@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定解码方式nioSocketChannel.pipeline().addLast(new StringDecoder());//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);ByteBuf response = ctx.alloc().buffer();response.writeBytes(msg.toString().getBytes());ctx.writeAndFlush(response);}});}})//4.绑定监听端口.bind(8080);}}

客户端:

import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;public class Client {public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup())//用什么进行发送?//可以是BIO,也可以是NIO,也可以是epoll.channel(NioSocketChannel.class)//处理器.handler(new ChannelInitializer() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定编码方式nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes("hello".getBytes());ctx.writeAndFlush(buffer);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg.toString());}});}})//连接到服务器.connect(new InetSocketAddress("localhost", 8080));}}10.粘包半包10.1.问题成因

粘包:发送abc def,接收到abcdef

半包:发送abcdef,接收到abc或者def

原因:

没有清晰的结束符,导致不知道收到何处才是一个完成的包。

IO缓冲区大小过大或者过小,导致收太多或者收不完。

解决粘包和半包问题通常需要在设计通信协议时采取一些策略。

10.2.解决办法10.2.1.短连接

解决粘包半包问题的其中一个办法是——短连接。

所谓短连接就是当一次完整的报文返送完成后,客户端主动断开TCP连接。粘包半包的根本原因其实就是不知道一个完整的报文何时收完,通过客户端发送完一次完整的信息后主动断开连接,让服务器端感知到,一次完整的信息发送完成。

客户端:

import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;public class Client {public static void main(String[] args) throws InterruptedException {for (int i=0;i() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定编码方式nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10});ctx.writeAndFlush(buffer);ctx.channel().close();}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();}catch(Exception e){e.printStackTrace();}finally {worker.shutdownGracefully();}}}

服务器:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class Server {public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup();NioEventLoopGroup worker=new NioEventLoopGroup();try{ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {boss.shutdownGracefully();worker.shutdownGracefully();}}}10.2.2.解码器1.概述

解码器是netty自带的一类用来从请求报文中解析出数据的handler。其底层原理都是从指定位置开始,解析出定长的字节内容来。

2.定长解码器

FixedLengthFrameDecoder,定长解码器,用来在报文中获取出指定长度的字节。

server:

import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.FixedLengthFrameDecoder;import java.nio.charset.StandardCharsets;public class server {public static void main(String[] args) throws InterruptedException {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 定长解码器,每个消息长度固定为10个字节pipeline.addLast(new FixedLengthFrameDecoder(10));// 业务处理器pipeline.addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = (ByteBuf) msg;String content = byteBuf.toString(StandardCharsets.UTF_8);System.out.println("Received message: " + content);byteBuf.release(); // 释放ByteBuf资源}});}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();}}3.行解码器

Netty的行处理器(LineBasedFrameDecoder)是一种用于处理以换行符(\n)或回车换行符(\r\n)为消息分隔符的情况。它会按照换行符或回车换行符将接收到的数据切分成消息,适用于处理文本协议中每行代表一个消息的场景。

server:

import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.LineBasedFrameDecoder;public class LineBasedServerHandler extends ChannelInboundHandlerAdapter {public LineBasedServerHandler() {// 添加行处理器到ChannelPipeline中,使用换行符作为消息分隔符ctx.pipeline().addLast(new LineBasedFrameDecoder(1024));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里的msg是一个ByteBuf,表示一个完整的行消息ByteBuf buf = (ByteBuf) msg;String line = buf.toString(io.netty.util.CharsetUtil.UTF_8);System.out.println("Received message: " + line);buf.release(); // 释放ByteBuf资源}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}

客户端:

import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class LineBasedClient {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 添加行处理器到ChannelPipeline中,使用换行符作为消息分隔符pipeline.addLast(new LineBasedFrameDecoder(1024));// 客户端的业务处理器pipeline.addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 发送带换行符的消息String message = "Hello, Netty!\n";ctx.writeAndFlush(message);}});}});bootstrap.connect("localhost", 8080).sync().channel().closeFuture().sync();}}4.固定帧长的解码器

Netty中的LengthFieldBasedFrameDecoder是一种用于解决粘包和半包问题的解码器。通信报文的结构说白了无非就是头部+身体,头部中记录关于消息长度等信息,身体中携带要传递的消息。LengthFieldBasedFrameDecoder就是根据设置的参数来准确的切分消息的头部和身体,就能确保每个消息被正确地接收和处理。

构造方法如下:

public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip)

maxFrameLength:指定消息的最大长度,超过这个长度的消息将被丢弃。

lengthFieldOffset:指定长度字段在消息中的偏移量。

lengthFieldLength:指定长度字段的长度,可以是1、2、3、4、8等字节。

lengthAdjustment:指定长度字段的值需要进行调整的偏移量,通常为消息头的长度。

initialBytesToStrip:指定解码时需要跳过的字节数,通常为长度字段的长度。

代码示例:

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;public class LengthFieldServerHandler extends ChannelInboundHandlerAdapter {public LengthFieldServerHandler() {// 添加LengthFieldBasedFrameDecoder,指定各个参数ctx.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里的msg是一个ByteBuf,表示一个完整的消息ByteBuf buf = (ByteBuf) msg;String message = buf.toString(io.netty.util.CharsetUtil.UTF_8);System.out.println("Received message: " + message);buf.release(); // 释放ByteBuf资源}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}11.协议解析11.1.Redis11.2.Http

名字里带codec的,在业内基本都是编解码器。

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpContent;import io.netty.handler.codec.http.HttpRequest;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class TestHttp {public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup();NioEventLoopGroup worker=new NioEventLoopGroup();try{ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));//Http的解码器socketChannel.pipeline().addLast(new HttpServerCodec());socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//经过http解码器解码后,请求会被解析为请求头实体或者请求体实体if(msg instanceof HttpRequest){//请求行、请求头}else if(msg instanceof HttpContent){//请求体}}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {boss.shutdownGracefully();worker.shutdownGracefully();}}}

当然上面这种写法太繁琐了,netty提供了SimpleChannelInboundHandler,用泛型来指定处理请求头还是请求体:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.*;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class TestHttp {public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup();NioEventLoopGroup worker=new NioEventLoopGroup();try{ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));//Http的解码器socketChannel.pipeline().addLast(new HttpServerCodec());socketChannel.pipeline().addLast(new SimpleChannelInboundHandler() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "hello world!".getBytes();//响应头设置返回的消息的长度,否则浏览器不知道消息有多长,会一直刷新response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,bytes.length);//响应体设置返回的消息response.content().writeBytes(bytes);//写回响应ctx.writeAndFlush(response);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {boss.shutdownGracefully();worker.shutdownGracefully();}}}12.协议设计12.1.概述

自定义协议要素:

魔术,用来判断数据包是否有效。

版本号,协议版本号,用来支持协议升级。

序列化算法,消息正文采用的序列化方式。

指令类型,是登录、注册、还是其他........

请求序号,用来支持双工通信,如TCP之类的。

正文长度

消息正文

12.2.编码

编解码,netty自带编解码器接口ByteToMessageCodec,允许开发者将数据报文转为自己想要的类型。

注意:想要转为的目标类型,必须是实现了序列化接口,可序列化的,不然会报错。

public class MyCodec extends ByteToMessageCodec {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {//4字节的魔数out.writeBytes(new byte[]{1, 2,3,4});//1字节的版本out.writeByte(1);//1字节的序列化方式jdkout.writeByte(0);//1字节的指令类型out.writeByte(msg.getMessageType());//4个字节序号out.writeInt(msg.getSequenceId());//填充字段out.writeByte(0xff);//消息内容ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();//消息长度out.writeInt(bytes.length);//写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List

测试:

public static void main(String[] args) throws Exception {EmbeddedChannel channel=new EmbeddedChannel(new LoggingHandler(),new MyCodec());Message message=new Message();message.setData("hello".getBytes());//出站会调用codec的encode()channel.writeOutbound(message);ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MyCodec().encode( null,message, buf);//入站会调用codec的decode()channel.writeInbound(buf);}

测试半包、粘包问题:

public static void main(String[] args) throws Exception {EmbeddedChannel channel=new EmbeddedChannel(new LengthFieldBasedFrameDecoder(1024,12,4,0,0),new LoggingHandler(),new MyCodec());Message message=new Message();message.setData("hello".getBytes());ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MyCodec().encode( null,message, buf);ByteBuf s1=buf.slice(0,100);ByteBuf s2=buf.slice(100,buf.readableBytes()-100);//writeInbound后ByteBuf的引用计数会被-1,导致ByteBuf被释放掉,这里需要手动维持一下s1.retain();channel.writeInbound(s1);channel.writeInbound(s2);}

相关推荐: